package com.xiaofan.java;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.CollectionEnvironment;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * use of the Table API for a Word Count in Java.
 */
public class WordCountSQL_E0003 {
    public static void main(String[] args) throws Exception {
        CollectionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        DataSet<WordCountSQL_E0002.WC> input = env.fromElements(
                new WordCountSQL_E0002.WC("Hello", 1),
                new WordCountSQL_E0002.WC("Hi", 1),
                new WordCountSQL_E0002.WC("Hello", 1)
        );

        Table table = tEnv.fromDataSet(input);
        tEnv.createTemporaryView("WordCount", table);

        Table filtered = tEnv.from("WordCount")
                .groupBy($("word"))
                .select($("word"), $("frequency").sum().as("frequency"))
                .filter($("frequency").isEqual(2));

        DataSet<WC> result = tEnv.toDataSet(filtered, WC.class);

        result.print();


    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class WC {
        public String word;
        public long frequency;

        @Override
        public String toString() {
            return "WC " + word + " " + frequency;
        }
    }
}
